bitkeeper revision 1.1236.43.12 (4244309dO7HfNtv-R6F5S3jdQqQR8A)
authorjrb44@plym.cl.cam.ac.uk <jrb44@plym.cl.cam.ac.uk>
Fri, 25 Mar 2005 15:39:09 +0000 (15:39 +0000)
committerjrb44@plym.cl.cam.ac.uk <jrb44@plym.cl.cam.ac.uk>
Fri, 25 Mar 2005 15:39:09 +0000 (15:39 +0000)
Enhanced concurrency support in blockstore.

Signed-off-by: James Bulpin <James.Bulpin@cl.cam.ac.uk>
tools/blktap/Makefile
tools/blktap/blktaplib.c
tools/blktap/blockstore.c
tools/blktap/parallax-threaded.h

index 7f71a219bf763dd8eff4a5e35eed31bece14ac51..3478552ac424a323cee7ccc899ac867ffce7e98b 100644 (file)
@@ -58,7 +58,7 @@ OBJS     = $(patsubst %.c,%.o,$(SRCS))
 
 LIB      = libblktap.so libblktap.so.$(MAJOR) libblktap.so.$(MAJOR).$(MINOR)
 
-all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax 
+all: mk-symlinks blkdump blkcow blkimg blkcowimg blkgnbd blkcowgnbd $(VDI_TOOLS) parallax parallax-threaded blockstored
        $(MAKE) $(LIB)
 
 LINUX_ROOT := $(wildcard $(XEN_ROOT)/linux-2.6.*-xen-sparse)
@@ -120,42 +120,42 @@ blkaio: $(LIB) blkaio.c blkaiolib.c
        $(CC) $(CFLAGS) -o blkaio -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap blkaio.c blkaiolib.c -laio -lpthread
 
 parallax: $(LIB) $(PLX_SRCS)
-       $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap $(PLX_SRCS) libgnbd/libgnbd.a
+       $(CC) $(CFLAGS) -o parallax -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lblktap -lpthread $(PLX_SRCS) libgnbd/libgnbd.a
 
 parallax-threaded: $(LIB) $(PLXT_SRCS)
        $(CC) $(CFLAGS) -o parallax-threaded -L$(XEN_LIBXC) -L$(XEN_LIBXUTIL) -L. -lpthread -lblktap $(PLXT_SRCS) libgnbd/libgnbd.a
 
 vdi_test: $(LIB) $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_test -DVDI_STANDALONE -lpthread $(VDI_SRCS)
 
 vdi_list: $(LIB) vdi_list.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_list vdi_list.c -lpthread $(VDI_SRCS)
 
 vdi_create: $(LIB) vdi_create.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_create vdi_create.c -lpthread $(VDI_SRCS)
 
 vdi_snap: $(LIB) vdi_snap.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_snap vdi_snap.c -lpthread $(VDI_SRCS)
 
 vdi_snap_list: $(LIB) vdi_snap_list.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_snap_list vdi_snap_list.c -lpthread $(VDI_SRCS)
 
 vdi_snap_delete: $(LIB) vdi_snap_delete.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_snap_delete vdi_snap_delete.c -lpthread $(VDI_SRCS)
 
 vdi_tree: $(LIB) vdi_tree.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_tree vdi_tree.c -lpthread $(VDI_SRCS)
 
 vdi_fill: $(LIB) vdi_fill.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_fill vdi_fill.c -lpthread $(VDI_SRCS)
 
 vdi_validate: $(LIB) vdi_validate.c $(VDI_SRCS)
-       $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c $(VDI_SRCS)
+       $(CC) $(CFLAGS) -g3 -o vdi_validate vdi_validate.c -lpthread $(VDI_SRCS)
 
 blockstored: blockstored.c
-       $(CC) $(CFLAGS) -g3 -o blockstored blockstored.c
+       $(CC) $(CFLAGS) -g3 -o blockstored -lpthread blockstored.c
 bstest: bstest.c blockstore.c
-       $(CC) $(CFLAGS) -g3 -o bstest bstest.c blockstore.c
+       $(CC) $(CFLAGS) -g3 -o bstest bstest.c -lpthread blockstore.c
 
 .PHONY: TAGS clean install mk-symlinks rpm
 TAGS:
index 35b893f677d347df0a5efb32b0f2ae255401dbdd..87b680d2cc64d8b0859d3074df76c8fc211835d1 100644 (file)
@@ -248,12 +248,21 @@ static void apply_rsp_hooks(blkif_response_t *rsp)
     }
 }
 
+static pthread_mutex_t push_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 void blktap_inject_response(blkif_response_t *rsp)
 {
+    
     apply_rsp_hooks(rsp);
+    
     write_rsp_to_fe_ring(rsp);
+    
+    pthread_mutex_lock(&push_mutex);
+    
     RING_PUSH_RESPONSES(&fe_ring);
     ioctl(fd, BLKTAP_IOCTL_KICK_FE);
+    
+    pthread_mutex_unlock(&push_mutex);
 }
 
 /*-----[ Polling fd listeners ]------------------------------------------*/
@@ -449,7 +458,9 @@ int blktap_listen(void)
             }
             /* Using this as a unidirectional ring. */
             ctrl_ring.req_cons = ctrl_ring.rsp_prod_pvt = i;
+pthread_mutex_lock(&push_mutex);
             RING_PUSH_RESPONSES(&ctrl_ring);
+pthread_mutex_unlock(&push_mutex);
             
             /* empty the fe_ring */
             notify_fe = 0;
@@ -517,14 +528,18 @@ int blktap_listen(void)
 
             if (notify_be) {
                 DPRINTF("notifying be\n");
+pthread_mutex_lock(&push_mutex);
                 RING_PUSH_REQUESTS(&be_ring);
                 ioctl(fd, BLKTAP_IOCTL_KICK_BE);
+pthread_mutex_unlock(&push_mutex);
             }
 
             if (notify_fe) {
                 DPRINTF("notifying fe\n");
+pthread_mutex_lock(&push_mutex);
                 RING_PUSH_RESPONSES(&fe_ring);
                 ioctl(fd, BLKTAP_IOCTL_KICK_FE);
+pthread_mutex_unlock(&push_mutex);
             }
         }        
     }
index 5de2a6885aa566c05af9be04c40ec564d5d87141..36903fe09e5ec669c3365cfdbeb5d0034f68d36c 100644 (file)
 #include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/time.h>
 #include <stdarg.h>
 #include "blockstore.h"
 #include <pthread.h>
 #include "parallax-threaded.h"
 
 #define BLOCKSTORE_REMOTE
-#define BSDEBUG
+//#define BSDEBUG
+
+#define RETRY_TIMEOUT 1000000 /* microseconds */
 
 /*****************************************************************************
  * Debugging
@@ -62,6 +65,37 @@ bscluster_t bsclusters[MAX_CLUSTERS];
 struct sockaddr_in sin_local;
 int bssock = 0;
 
+/*****************************************************************************
+ * Notification                                                              *
+ *****************************************************************************/
+
+typedef struct pool_thread_t_struct {
+    pthread_mutex_t ptmutex;
+    pthread_cond_t ptcv;
+    int newdata;
+} pool_thread_t;
+
+pool_thread_t pool_thread[READ_POOL_SIZE+1];
+
+#define RECV_NOTIFY(tid) { \
+    pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
+    pool_thread[tid].newdata = 1; \
+    DB("CV Waking %u", tid); \
+    pthread_cond_signal(&(pool_thread[tid].ptcv)); \
+    pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
+#define RECV_AWAIT(tid) { \
+    pthread_mutex_lock(&(pool_thread[tid].ptmutex)); \
+    if (pool_thread[tid].newdata) { \
+        pool_thread[tid].newdata = 0; \
+        DB("CV Woken %u", tid); \
+    } \
+    else { \
+        DB("CV Waiting %u", tid); \
+        pthread_cond_wait(&(pool_thread[tid].ptcv), \
+                          &(pool_thread[tid].ptmutex)); \
+    } \
+    pthread_mutex_unlock(&(pool_thread[tid].ptmutex)); }
+
 /*****************************************************************************
  * Message queue management                                                  *
  *****************************************************************************/
@@ -76,23 +110,6 @@ pthread_mutex_t ptmutex_recv;
 #define ENTER_RECV_CR pthread_mutex_lock(&ptmutex_recv)
 #define LEAVE_RECV_CR pthread_mutex_unlock(&ptmutex_recv)
 
-int notify = 0;
-pthread_mutex_t ptmutex_notify;
-pthread_cond_t ptcv_notify;
-#define RECV_NOTIFY { \
-    pthread_mutex_lock(&ptmutex_notify); \
-    notify = 1; \
-    pthread_cond_signal(&ptcv_notify); \
-    pthread_mutex_unlock(&ptmutex_notify); }
-#define RECV_AWAIT { \
-    pthread_mutex_lock(&ptmutex_notify); \
-    if (notify) \
-        notify = 0; \
-    else \
-        pthread_cond_wait(&ptcv_notify, &ptmutex_notify); \
-    pthread_mutex_unlock(&ptmutex_notify); }
-    
-
 /* A message queue entry. We allocate one of these for every request we send.
  * Asynchronous reply reception also used one of these.
  */
@@ -104,6 +121,8 @@ typedef struct bsq_t_struct {
     int length;
     struct msghdr msghdr;
     struct iovec iov[2];
+    int tid;
+    struct timeval tv_sent;
     bshdr_t message;
     void *block;
 } bsq_t;
@@ -267,11 +286,13 @@ int send_message(bsq_t *qe) {
     qe->message.luid = new_luid();
 
     qe->status = 0;
+    qe->tid = (int)pthread_getspecific(tid_key);
     if (enqueue(qe) < 0) {
         fprintf(stderr, "Error enqueuing request.\n");
         return -1;
     }
 
+    gettimeofday(&(qe->tv_sent), NULL);
     DB("send_message to %d luid=%016llx\n", qe->server, qe->message.luid);
     rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
     //rc = sendto(bssock, (void *)&(qe->message), qe->length, 0,
@@ -407,6 +428,7 @@ void recv_recycle_buffer(bsq_t *q) {
 int wait_recv(bsq_t **reqs, int numreqs) {
     bsq_t *q, *m;
     unsigned int x, i;
+    int tid = (int)pthread_getspecific(tid_key);
 
     DB("ENTER wait_recv %u\n", numreqs);
 
@@ -420,7 +442,7 @@ int wait_recv(bsq_t **reqs, int numreqs) {
         return numreqs;
     }
 
-    RECV_AWAIT;
+    RECV_AWAIT(tid);
 
     /*
     rxagain:
@@ -442,6 +464,52 @@ int wait_recv(bsq_t **reqs, int numreqs) {
 
 }
 
+/* retry
+ */
+static int retry_count = 0;
+int retry(bsq_t *qe)
+{
+    int rc;
+    gettimeofday(&(qe->tv_sent), NULL);
+    DB("retry to %d luid=%016llx\n", qe->server, qe->message.luid);
+    retry_count++;
+    rc = sendmsg(bssock, &(qe->msghdr), MSG_DONTWAIT);
+    if (rc < 0)
+        return rc;
+    return 0;
+}
+
+/* queue runner
+ */
+void *queue_runner(void *arg)
+{
+    for (;;) {
+        struct timeval now;
+        long long nowus, sus;
+        bsq_t *q;
+        int r;
+
+        sleep(1);
+
+        gettimeofday(&now, NULL);
+        nowus = now.tv_usec + now.tv_sec * 1000000;
+        ENTER_QUEUE_CR;
+        r = retry_count;
+        for (q = bs_head; q; q = q->next) {
+            sus = q->tv_sent.tv_usec + q->tv_sent.tv_sec * 1000000;
+            if ((nowus - sus) > RETRY_TIMEOUT) {
+                if (retry(q) < 0) {
+                    fprintf(stderr, "Error on sendmsg retry.\n");
+                }
+            }
+        }
+        if (r != retry_count) {
+            fprintf(stderr, "RETRIES: %u %u\n", retry_count - r, retry_count);
+        }
+        LEAVE_QUEUE_CR;
+    }
+}
+
 /* receive loop
  */
 void *receive_loop(void *arg)
@@ -461,7 +529,7 @@ void *receive_loop(void *arg)
             }
             else {
                 DB("RX MATCH");
-                RECV_NOTIFY;
+                RECV_NOTIFY(m->tid);
             }
         }
     }
@@ -1146,8 +1214,12 @@ int __init_blockstore(void)
     pthread_mutex_init(&ptmutex_queue, NULL);
     pthread_mutex_init(&ptmutex_luid, NULL);
     pthread_mutex_init(&ptmutex_recv, NULL);
-    pthread_mutex_init(&ptmutex_notify, NULL);
-    pthread_cond_init(&ptcv_notify, NULL);
+    /*pthread_mutex_init(&ptmutex_notify, NULL);*/
+    for (i = 0; i <= READ_POOL_SIZE; i++) {
+        pool_thread[i].newdata = 0;
+        pthread_mutex_init(&(pool_thread[i].ptmutex), NULL);
+        pthread_cond_init(&(pool_thread[i].ptcv), NULL);
+    }
 
     bsservers[0].hostname = "firebug.cl.cam.ac.uk";
     bsservers[1].hostname = "planb.cl.cam.ac.uk";
@@ -1225,6 +1297,7 @@ int __init_blockstore(void)
     }
 
     pthread_create(&pthread_recv, NULL, receive_loop, NULL);
+    pthread_create(&pthread_recv, NULL, queue_runner, NULL);
 
 #else /* /BLOCKSTORE_REMOTE */
     block_fp = open("blockstore.dat", O_RDWR | O_CREAT | O_LARGEFILE, 0644);
@@ -1262,9 +1335,14 @@ int __init_blockstore(void)
 
 void __exit_blockstore(void)
 {
+    int i;
     pthread_mutex_destroy(&ptmutex_recv);
     pthread_mutex_destroy(&ptmutex_luid);
     pthread_mutex_destroy(&ptmutex_queue);
-    pthread_mutex_destroy(&ptmutex_notify);
-    pthread_cond_destroy(&ptcv_notify);
+    /*pthread_mutex_destroy(&ptmutex_notify);
+      pthread_cond_destroy(&ptcv_notify);*/
+    for (i = 0; i <= READ_POOL_SIZE; i++) {
+        pthread_mutex_destroy(&(pool_thread[i].ptmutex));
+        pthread_cond_destroy(&(pool_thread[i].ptcv));
+    }
 }
index 17cdcb983e5e1a987f7d71a63720f5e05019b44e..de39609fcc081b1c149d3b60192f41b124e7004a 100644 (file)
@@ -14,7 +14,8 @@
 #define NOTHREADS
 #endif
 
-#define READ_POOL_SIZE 128
+//#define READ_POOL_SIZE 128
+#define READ_POOL_SIZE 8
 
 /* per-thread identifier */
 pthread_key_t tid_key;